package com.data.dev.flink.dnsTopic.main;

import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;

import com.data.dev.flink.dnsTopic.OperationForBlackListCheck.DnsMsgFilter;
import com.data.dev.flink.dnsTopic.OperationForBlackListCheck.DnsMsgToBeanMapper;
import com.data.dev.flink.dnsTopic.OperationForBlackListCheck.DnsMsgToStrMapper;
import com.data.dev.kafka.KafkaSourceBuilder;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.flink.FlinkEnv;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;

import java.io.Serializable;

/**
 * @author wangxiaoming-ghq 2022-05-10
 * 一、解析kafka dns topic 中的域名 判断是否为黑名单
 * 二、解析kafka dns topic 中的IP 判断是否为黑名单
 */

@Slf4j
public class DnsMsg implements Serializable {
    public static final  String INDEX_NAME = ElasticSearchInfo.ES_BLACKLIST_INDEX_NAME;
    public static final  String INDEX_TYPE = ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT;
    public static final String JOB_NAME = "告警采集平台——DNS访问黑名单告警";
    public DnsMsg(){}

    public static void execute()  {

        //① 创建Flink执行环境并设置checkpoint等必要的参数
        final StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();

        //② 创建kafka数据源DataStream，消费指定主题的消息
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_DNS_TOPIC_NAME,ConfigurationKey.KAFKA_DNS_CONSUMER_GROUP_ID) ;
        DataStreamSource<String> kafkaDnsMsg = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source for AlarmPlatform About DNS Topic");


        //③ 针对该主题进行一系列需要进行的规则计算
        SingleOutputStreamOperator<String> DnsMsgFilteredBlackListDataStream = kafkaDnsMsg
                .map(new DnsMsgToBeanMapper()).name("映射")
                .filter(new DnsMsgFilter()).name("过滤") //匹配到符合条件的消息后，进行企业微信告警推送
                .map(new DnsMsgToStrMapper()).name("标准化消息体");

        //④ 将计算后的符合需求的告警消息推送到ES
        ElasticsearchSink<String> esSink = SinkToEs.getEsSinkBuilder(INDEX_NAME,INDEX_TYPE).build();
        DnsMsgFilteredBlackListDataStream.addSink(esSink);

        //⑤最终步骤：提交Flink集群进行执行
        FlinkEnv.envExec(env,JOB_NAME);

    }

}


